package mylab.starters.mq.impl.ons.tcp;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.google.common.base.Charsets;
import mylab.starters.mq.api.Message;
import mylab.starters.mq.api.Properties;
import mylab.starters.mq.config.MqPropperties;
import mylab.starters.mq.impl.AbstractProducer;
import mylab.starters.mq.impl.ons.OnsUtil;


/**
 * Created by sylar on 2017/1/6.
 */
public class OnsTcpProducer extends AbstractProducer {

    private com.aliyun.openservices.ons.api.Producer producer;

    public OnsTcpProducer(Properties properties, String groupId, String clientId) {
        super(properties, groupId, clientId);
    }

    @Override
    protected void onStart() throws Exception {
        java.util.Properties properties = OnsUtil.tcpProperties((MqPropperties.OnsTcpProperties) this.properties, groupId);
        producer = ONSFactory.createProducer(properties);
        producer.start();
    }

    @Override
    protected void onStop() {
        producer.shutdown();
        producer = null;
    }

    @Override
    protected Object onSend(Message message) throws Exception {
        return producer.send(new com.aliyun.openservices.ons.api.Message(
                message.getTopic(),
                message.getTag(),
                message.getKey(),
                message.getContent().getBytes(Charsets.UTF_8))
        );
    }
}